package main

import (
	"fmt"
	broadcastbroker "gitee.com/sqxwww/xmachinery/brokers/redis"
	"github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	"github.com/RichardKnop/machinery/v2/config"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/tasks"
	"sync"
	"time"
)

var server *machinery.Server

func init() {
	server, _ = startServer()
}

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker()
		}()
	}
	time.Sleep(time.Second)
	singature, _ := tasks.NewSignature("hello", nil)
	//设置广播任务头
	singature.Headers = map[string]interface{}{"broadcastTask": ""}
	server.SendTask(singature)
	wg.Wait()
}

func startServer() (*machinery.Server, error) {
	cnf := &config.Config{
		DefaultQueue:    "machinery_tasks",
		ResultsExpireIn: 3600,
		Redis: &config.RedisConfig{
			MaxIdle:                3,
			IdleTimeout:            240,
			ReadTimeout:            15,
			WriteTimeout:           15,
			ConnectTimeout:         15,
			NormalTasksPollPeriod:  1000,
			DelayedTasksPollPeriod: 500,
		},
	}
	//使用支持广播的broker
	broker := broadcastbroker.New(cnf, []string{"localhost:6379"}, 0)
	backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	server := machinery.NewServer(cnf, broker, backend, lock)

	tasks := map[string]interface{}{
		"hello": func() error {
			fmt.Println("hello broadcast broker")
			return nil
		},
	}

	return server, server.RegisterTasks(tasks)
}

func worker() error {
	consumerTag := "machinery_worker"
	worker := server.NewWorker(consumerTag, 0)
	errorsChan := make(chan error)

	worker.LaunchAsync(errorsChan)

	return <-errorsChan
}
